package com.dahuan.transform;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_KeyBy {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        //从文件读取数据
        String path = "E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt";
        DataStreamSource<String> stringDataStreamSource = env.readTextFile( path );

        /**
         * DataStream → KeyedStream：逻辑地将一个流拆分成不相交的分区，每个分
         * 区包含具有相同 key 的元素，在内部以 hash 的形式实现的。
         */

        //将String类型转换成SensorReading类型
        DataStream<SensorReading> dataStream = stringDataStreamSource.map( new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String value) throws Exception {
                //文件中的每一组数据用逗号分割
                String[] split = value.split( "," );
                return new SensorReading(split[0],new Long(split[1]),new Double( split[2] ));
            }
        } );
        //按照id进行分组
        KeyedStream<SensorReading, Tuple> keyedStream = dataStream.keyBy( "id" );
        //取出当前最大的温度值
        SingleOutputStreamOperator<SensorReading> temperature = keyedStream.max( "temperature" );//TODO maxBy直接取出当前最大值的时间戳


        temperature.print();
        env.execute("Transform_KeyBy");
    }
}
